package com.veromca.kafka.kafka_demo.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer extends Thread {
	private String topic;
	public KafkaConsumer(String topic){
		super();
		this.topic = topic;
		
	}
	
	@Override
	public void run(){
		try {
			ConsumerConnector consumer = createConsumer();
			Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
			topicCountMap.put(topic, new Integer(1));
			Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);  
		        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);  
		        System.out.println("stream：" +consumerMap.get(topic).get(0)); 
		        ConsumerIterator<byte[], byte[]> it = stream.iterator();  
		        while (it.hasNext()) {  
		        
		            System.out.println("receive：" + new String(it.next().message()));  
		        	//这里指的注意，如果没有下面这个语句的执行很有可能回从头来读消息的
		            consumer.commitOffsets();
		        }  
			
		} catch (Exception e) {
			e.printStackTrace();
            System.out.println("error：" +e.getMessage());  
		}
		
	}
	
	private ConsumerConnector createConsumer(){
		Properties props = new Properties();
		props.setProperty("zookeeper.connect", "192.168.1.113:2181");
		props.setProperty("group.id", "test3");	
		return Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
	}
	
	public static void main(String[] args) {
		  KafkaConsumer consumerThread = new KafkaConsumer("test3");  
	      consumerThread.start();  
	}

}
